
from numpy import int64, mat
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dask.distributed import Client
import numpy as np
import gc

ProgressBar().register()


def convert_strings(x):
    try:
        out = int(float(x))
    except ValueError:
        out = np.nan
    return out


rais_2014 = dd.read_csv('rais/rais_2014.csv',
                        dtype={'radiccnpj': 'object',
                               'numectps': 'object'})

rais_2019 = dd.read_csv('rais/rais_2019.csv',
                        dtype={'radiccnpj': 'object',
                               'numectps': 'object'})

rais_2014 = rais_2014.assign(year=2014)

rais_2019 = rais_2019.assign(year=2019)

rais = rais_2014.append(rais_2019)

# filter public employees and entries not corresponding to the year end    contract information

rais = rais[-rais['tpvinculo'].str.contains('EST')]

rais = rais[rais['empem3112'] == "Sim"]

# get first three digits of clascnae20

rais['clascnae20_3d'] = rais['clascnae20'].apply(lambda x: str(x)[:3])

rais = rais[['municipio', 'clascnae20_3d', 'remmedia', 'raca_cor',
             'ocup2002', 'CPF', 'year', 'radiccnpj', 'grinstrucao', 'tempempr']]

rais['radiccnpj'] = rais['radiccnpj'].apply(convert_strings)

rais = rais.compute()

# matched firms

# industry median

industry_median = rais.groupby(
    ['clascnae20_3d', 'year'])['remmedia'].apply(np.nanmedian).reset_index()

# occupation median

occup_wage = rais.groupby(
    ['ocup2002', 'year'])['remmedia'].apply(np.nanmedian).reset_index()

# educational group median

educ_wage = rais.groupby(
    ['grinstrucao', 'year'])['remmedia'].apply(np.nanmedian).reset_index()

print("Matching firms")

# rais_match.py creates this file

rais_match = pd.read_csv('rais_matched_test_clt.csv')

matched_firms = rais_match['radiccnpj'].unique()

matched_firms = pd.DataFrame(matched_firms)

matched_firms.columns = ['radiccnpj']

matched_firms['radiccnpj'] = matched_firms['radiccnpj'].astype(object)

matched_firms = matched_firms.assign(matched_firm=1)

matched_firms = matched_firms.dropna()

matched_firms['radiccnpj'] = matched_firms['radiccnpj'].apply(
    convert_strings)

rais_matched_firms = pd.merge(rais, matched_firms, on='radiccnpj')

# matched workers

print("Matching workers")

matched_workers = rais_match['CPF'].unique()

matched_workers = pd.DataFrame(matched_workers)

matched_workers.columns = ['CPF']

matched_workers['CPF'] = matched_workers['CPF'].astype(object)

matched_workers = matched_workers.assign(matched_cpf=1)

matched_workers = matched_workers.dropna()

matched_workers['CPF'] = matched_workers['CPF'].apply(
    convert_strings)

rais = rais.dropna(subset=['CPF'])

rais['CPF'] = rais['CPF'].apply(convert_strings)

rais_matched_workers = pd.merge(rais, matched_workers, on='CPF')

# about 6% of cases have more than one entry, some are exact duplicates, others  are instances in which the employment record is updated throughout the year (happens with new workers),since temempr is updated throughout the year, ordering by this variable and keeping the highest value returns the values at year's end

# there are still duplicated instances, that is, taking the highest valueof    tempempr (i.e., 11.8 months since first contracted) the same contract with all  the same values repeats, so drop duplicates after removing early than december entries

rais_matched_firms = rais_matched_firms.reset_index()

rais_matched_firms = rais_matched_firms.sort_values(
    'tempempr').drop_duplicates(['CPF', 'radiccnpj', 'year'], keep='last')

gc.collect()

# there are a few thousand cases in which the CNPJ is 0

rais_matched_firms = rais_matched_firms[rais_matched_firms['radiccnpj'] != 0]

# i need to use quantile within groupby below which isn't implemented in dask   so compute this

# careful because this takes a lot of time [########################################] | 100% Completed | 49min 18.9s

# firm characteristics

# firm median ratio to industry

print("Firm to industry ratio")

gc.collect()

industry_median.columns = ['clascnae20_3d', 'year', 'remmediaind']

firm_median = rais_matched_firms.groupby(
    ['radiccnpj', 'clascnae20_3d', 'year'])['remmedia'].apply(np.nanmedian).reset_index()

firm_median = pd.merge(
    firm_median, industry_median, on=['clascnae20_3d', 'year'], how='left')

firm_median['remmedia_dif_industry'] = firm_median['remmedia'] - \
    firm_median['remmediaind']

rais_matched_workers = pd.merge(
    rais_matched_workers,
    firm_median[['radiccnpj', 'clascnae20_3d', 'year',
                'remmedia_dif_industry']],
    on=['radiccnpj', 'clascnae20_3d', 'year'],
    how='left')

# firm median

firm_median = firm_median[['radiccnpj', 'clascnae20_3d', 'year',
                           'remmedia']]

firm_median.columns = ['radiccnpj', 'clascnae20_3d', 'year', 'medianfirm']

rais_matched_workers = pd.merge(
    rais_matched_workers, firm_median,
    on=['radiccnpj', 'clascnae20_3d', 'year'], how='left')

# worker characteristics

# wage ratio worker to occupational group

print("Worker to occupation ratio")

occup_wage.columns = ['ocup2002', 'year', 'remmediaocup']

rais_matched_workers = pd.merge(
    rais_matched_workers, occup_wage, on=['ocup2002', 'year'], how='left')

rais_matched_workers['remmedia_dif_ocup'] = rais_matched_workers['remmedia'] - \
    rais_matched_workers['remmediaocup']

# wage ratio worker to educational group

educ_wage.columns = ['grinstrucao', 'year', 'remmediaeduc']

rais_matched_workers = pd.merge(
    rais_matched_workers, educ_wage, on=['grinstrucao', 'year'], how='left')

rais_matched_workers['remmedia_dif_educ'] = rais_matched_workers['remmedia'] - \
    rais_matched_workers['remmediaeduc']

# final step in which only occupation with the highest wage is left foreach    worker
# careful, this is slow and takes lots of memory

print("Computing all...")

gc.collect()

print("Saving...")

rais_matched_workers.to_csv('data/rais_matched_firm_out.csv', index=False)
